SQL Code Voorbeelden
Praktische SQL scripts voor data engineering, ETL en data analysis
Incremental Data Load
Use case: Incrementeel laden van data in Snowflake met MERGE statement. Optimaliseert performance en reduceert kosten.
- Handelt updates en inserts
- Minimaliseert data processing
- Audit logging ingebouwd
incremental_load.sql
-- Incremental Data Load met MERGE statement
-- Geschikt voor Snowflake, BigQuery, Synapse
-- Stap 1: Maak staging table aan (tijdelijk)
CREATE OR REPLACE TEMPORARY TABLE staging_sales AS
SELECT
order_id,
customer_id,
product_id,
quantity,
amount,
order_date,
'batch_' || CURRENT_DATE() AS batch_id,
CURRENT_TIMESTAMP() AS loaded_at
FROM external_sales_source
WHERE order_date >= DATEADD(day, -1, CURRENT_DATE());
-- Stap 2: MERGE statement voor incremental load
MERGE INTO production.sales_fact AS target
USING staging_sales AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN
UPDATE SET
target.quantity = source.quantity,
target.amount = source.amount,
target.updated_at = CURRENT_TIMESTAMP(),
target.is_active = TRUE
WHEN NOT MATCHED THEN
INSERT (
order_id,
customer_id,
product_id,
quantity,
amount,
order_date,
created_at,
updated_at,
is_active
) VALUES (
source.order_id,
source.customer_id,
source.product_id,
source.quantity,
source.amount,
source.order_date,
CURRENT_TIMESTAMP(),
CURRENT_TIMESTAMP(),
TRUE
);
-- Stap 3: Soft delete oude records (optioneel)
UPDATE production.sales_fact
SET is_active = FALSE,
updated_at = CURRENT_TIMESTAMP()
WHERE order_date < DATEADD(month, -13, CURRENT_DATE())
AND is_active = TRUE;
-- Stap 4: Audit logging
INSERT INTO audit.data_load_log (
table_name,
rows_inserted,
rows_updated,
rows_deleted,
load_timestamp,
batch_id
)
SELECT
'sales_fact' AS table_name,
COUNT(CASE WHEN metadata$action = 'INSERT' THEN 1 END) AS rows_inserted,
COUNT(CASE WHEN metadata$action = 'UPDATE' THEN 1 END) AS rows_updated,
0 AS rows_deleted,
CURRENT_TIMESTAMP() AS load_timestamp,
'batch_' || CURRENT_DATE() AS batch_id
FROM staging_sales;
Data Quality Checks
Use case: Uitgebreide data quality checks voor een customer database. Controleert completeness, validity en consistency.
- NULL value checks
- Domain validation
- Referential integrity
- Data profiling
data_quality_checks.sql
-- Data Quality Checks voor Customer Data
-- Geschikt voor PostgreSQL, MySQL, SQL Server
-- 1. Completeness Checks (NULL/Empty values)
WITH completeness_checks AS (
SELECT
'customers' AS table_name,
COUNT(*) AS total_rows,
COUNT(customer_id) AS customer_id_not_null,
COUNT(first_name) AS first_name_not_null,
COUNT(email) AS email_not_null,
COUNT(CASE WHEN email LIKE '%@%.%' THEN 1 END) AS valid_email_format,
COUNT(CASE WHEN phone ~ '^[0-9+\-\s()]{10,20}$' THEN 1 END) AS valid_phone
FROM production.customers
WHERE is_active = TRUE
),
completeness_pct AS (
SELECT
table_name,
total_rows,
ROUND(100.0 * customer_id_not_null / total_rows, 2) AS customer_id_complete_pct,
ROUND(100.0 * first_name_not_null / total_rows, 2) AS first_name_complete_pct,
ROUND(100.0 * email_not_null / total_rows, 2) AS email_complete_pct,
ROUND(100.0 * valid_email_format / email_not_null, 2) AS email_valid_pct,
ROUND(100.0 * valid_phone / total_rows, 2) AS phone_valid_pct
FROM completeness_checks
)
SELECT * FROM completeness_pct
-- 2. Uniqueness Checks
UNION ALL
SELECT
'customers_uniqueness' AS table_name,
COUNT(*) AS total_rows,
NULL AS customer_id_complete_pct,
NULL AS first_name_complete_pct,
NULL AS email_complete_pct,
NULL AS email_valid_pct,
ROUND(100.0 * unique_emails / total_rows, 2) AS email_unique_pct
FROM (
SELECT
COUNT(*) AS total_rows,
COUNT(DISTINCT email) AS unique_emails
FROM production.customers
WHERE email IS NOT NULL
) t
-- 3. Referential Integrity Check
UNION ALL
SELECT
'referential_integrity' AS table_name,
COUNT(*) AS orphaned_records,
NULL, NULL, NULL, NULL,
ROUND(100.0 * COUNT(*) / (SELECT COUNT(*) FROM production.orders), 2) AS orphaned_pct
FROM production.orders o
LEFT JOIN production.customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL
-- 4. Business Rule Validation
UNION ALL
SELECT
'business_rules' AS table_name,
COUNT(*) AS rule_violations,
NULL, NULL, NULL, NULL,
ROUND(100.0 * COUNT(*) / (SELECT COUNT(*) FROM production.orders), 2) AS violation_pct
FROM production.orders o
JOIN production.customers c ON o.customer_id = c.customer_id
WHERE o.order_amount < 0
OR o.order_date > CURRENT_DATE
OR (c.country = 'NL' AND o.order_amount > 10000)
Python Code Voorbeelden
Python scripts voor data processing, ETL pipelines en data quality
ETL Pipeline Template
Use case: Complete ETL pipeline template met error handling, logging en configuratie management.
- Configuration from YAML
- Error handling en retry logic
- Logging naar zowel console als file
- Modular design
etl_pipeline.py
#!/usr/bin/env python3
"""
ETL Pipeline Template voor Data Engineering
"""
import yaml
import logging
from datetime import datetime
from typing import Dict, Any
import pandas as pd
from pathlib import Path
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('etl_pipeline.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ETLError(Exception):
"""Custom exception voor ETL fouten"""
pass
class ETLPipeline:
"""Basis ETL pipeline klasse"""
def __init__(self, config_path: str = "config/etl_config.yaml"):
self.config = self.load_config(config_path)
self.stats = {
'start_time': None,
'end_time': None,
'rows_processed': 0,
'rows_failed': 0,
'errors': []
}
def load_config(self, config_path: str) -> Dict[str, Any]:
"""Laad configuratie van YAML bestand"""
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
logger.info(f"Config geladen van {config_path}")
return config
except Exception as e:
logger.error(f"Fout bij laden config: {e}")
raise ETLError(f"Config laden mislukt: {e}")
def extract(self) -> pd.DataFrame:
"""Extract data van bron"""
try:
source_type = self.config['source']['type']
source_path = self.config['source']['path']
logger.info(f"Extract data van {source_type}: {source_path}")
if source_type == 'csv':
df = pd.read_csv(source_path, dtype=str)
elif source_type == 'parquet':
df = pd.read_parquet(source_path)
elif source_type == 'json':
df = pd.read_json(source_path, lines=True)
else:
raise ValueError(f"Onbekend source type: {source_type}")
logger.info(f"Successvol {len(df)} rijen geëxtraheerd")
return df
except Exception as e:
logger.error(f"Extract mislukt: {e}")
raise ETLError(f"Extract fase mislukt: {e}")
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Transformeer data volgens business rules"""
try:
transformations = self.config.get('transformations', {})
# Data cleaning
if transformations.get('drop_duplicates'):
df = df.drop_duplicates()
logger.info(f"Duplicates verwijderd, {len(df)} rijen over")
# Type conversions
for col, dtype in transformations.get('type_conversions', {}).items():
if col in df.columns:
if dtype == 'datetime':
df[col] = pd.to_datetime(df[col], errors='coerce')
elif dtype == 'numeric':
df[col] = pd.to_numeric(df[col], errors='coerce')
logger.info(f"Transform voltooid, {len(df)} rijen getransformeerd")
return df
except Exception as e:
logger.error(f"Transform mislukt: {e}")
raise ETLError(f"Transform fase mislukt: {e}")
def load(self, df: pd.DataFrame):
"""Load data naar doel"""
try:
target_type = self.config['target']['type']
target_path = self.config['target']['path']
logger.info(f"Load data naar {target_type}: {target_path}")
# Maak directory aan als nodig
Path(target_path).parent.mkdir(parents=True, exist_ok=True)
if target_type == 'parquet':
df.to_parquet(target_path, index=False)
elif target_type == 'csv':
df.to_csv(target_path, index=False)
else:
raise ValueError(f"Onbekend target type: {target_type}")
logger.info(f"Successvol {len(df)} rijen geladen naar {target_path}")
except Exception as e:
logger.error(f"Load mislukt: {e}")
raise ETLError(f"Load fase mislukt: {e}")
def run(self):
"""Voer de complete ETL pipeline uit"""
self.stats['start_time'] = datetime.now()
logger.info("=" * 50)
logger.info("ETL Pipeline gestart")
logger.info("=" * 50)
try:
# Extract
raw_data = self.extract()
# Transform
transformed_data = self.transform(raw_data)
# Load
self.load(transformed_data)
# Update statistics
self.stats['rows_processed'] = len(transformed_data)
self.stats['end_time'] = datetime.now()
duration = self.stats['end_time'] - self.stats['start_time']
logger.info("=" * 50)
logger.info("ETL Pipeline succesvol voltooid!")
logger.info(f"Duur: {duration}")
logger.info(f"Rijen verwerkt: {self.stats['rows_processed']}")
logger.info("=" * 50)
return True
except ETLError as e:
logger.error(f"ETL Pipeline mislukt: {e}")
self.stats['errors'].append(str(e))
return False
except Exception as e:
logger.error(f"Onverwachte fout: {e}")
self.stats['errors'].append(str(e))
return False
if __name__ == "__main__":
# Voorbeeld gebruik
pipeline = ETLPipeline("config/etl_config.yaml")
success = pipeline.run()
if success:
print("ETL pipeline succesvol uitgevoerd!")
else:
print("ETL pipeline mislukt, zie logs voor details")
PySpark Code Voorbeelden
PySpark scripts voor big data processing en distributed computing
Performance Optimization
Use case: Geavanceerde performance tuning voor PySpark jobs. Optimaliseert partitioning, caching en query execution.
- Dynamic partition pruning
- Broadcast joins voor kleine tables
- Delta Lake optimizations
- Memory management
performance_optimization.py
"""
PySpark Performance Optimization Template
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Initialize Spark session met optimizations
spark = SparkSession.builder \
.appName("PerformanceOptimization") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.autoBroadcastJoinThreshold", "104857600") \
.getOrCreate()
class SparkOptimizer:
"""Spark performance optimizer"""
def __init__(self, spark_session):
self.spark = spark_session
def repartition_optimally(self, df, partition_column=None, num_partitions=None):
"""
Herpartitioneer DataFrame optimal voor verdere processing
"""
current_partitions = df.rdd.getNumPartitions()
if partition_column:
# Partitioneren op column voor shuffle avoidance
df = df.repartition(num_partitions or 200, partition_column)
elif num_partitions:
# Specifiek aantal partitions
df = df.repartition(num_partitions)
else:
# Adaptive partitioning gebaseerd op data size
data_size_mb = df.count() * 100 / (1024 * 1024) # Schatting
optimal_partitions = max(1, int(data_size_mb / 128))
df = df.repartition(optimal_partitions)
print(f"Repartitioned from {current_partitions} to {df.rdd.getNumPartitions()} partitions")
return df
# Voorbeeld: E-commerce Analytics Pipeline
def ecommerce_analytics_pipeline():
"""End-to-end analytics pipeline met performance optimizations"""
optimizer = SparkOptimizer(spark)
# 1. Lees data
print("📥 Stap 1: Data laden...")
orders_df = spark.read.format("delta").load("/mnt/data/orders")
customers_df = spark.read.format("delta").load("/mnt/data/customers")
# 2. Filter en transform
print("🔄 Stap 2: Data transformeren...")
recent_orders = orders_df.filter(col("order_date") >= "2024-01-01")
# Repartition op join key voor betere performance
recent_orders = optimizer.repartition_optimally(recent_orders, "customer_id")
customers_df = optimizer.repartition_optimally(customers_df, "customer_id")
# 3. Join en aggregatie
print("🤝 Stap 3: Joins en aggregaties...")
enriched_orders = recent_orders.join(customers_df, "customer_id", "inner")
customer_metrics = enriched_orders.groupBy("customer_id", "country").agg(
count("*").alias("total_orders"),
sum("order_amount").alias("total_spent"),
avg("order_amount").alias("avg_order_value")
)
print(f"✅ Pipeline succesvol voltooid! Resultaten: {customer_metrics.count()} rijen")
return customer_metrics
if __name__ == "__main__":
print("🚀 PySpark Performance Optimization Demo")
print("=" * 50)
# Voer pipeline uit
try:
results = ecommerce_analytics_pipeline()
# Show sample results
print("\n📋 Sample results:")
results.show(10, truncate=False)
except Exception as e:
print(f"❌ Fout: {e}")
finally:
spark.stop()
print("\n🎯 Spark session gestopt")